// Copyright tang.  All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package com.gitee.dbswitch.admin.execution;

import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.gitee.dbswitch.admin.config.ExecutorConfig;
import com.gitee.dbswitch.admin.dao.APIAssignmentConfigDAO;
import com.gitee.dbswitch.admin.dao.AssignmentConfigDAO;
import com.gitee.dbswitch.admin.dao.AssignmentJobDAO;
import com.gitee.dbswitch.admin.dao.AssignmentTaskDAO;
import com.gitee.dbswitch.admin.entity.AssignmentConfigEntity;
import com.gitee.dbswitch.admin.entity.APIAssignmentConfigEntity;
import com.gitee.dbswitch.admin.entity.AssignmentJobEntity;
import com.gitee.dbswitch.admin.entity.AssignmentTaskEntity;
import com.gitee.dbswitch.admin.logback.LogbackAppenderRegister;
import com.gitee.dbswitch.admin.type.ConfigTypeEnum;
import com.gitee.dbswitch.admin.type.JobStatusEnum;
import com.gitee.dbswitch.common.entity.MdcKeyValue;
import com.gitee.dbswitch.data.config.DbswtichPropertiesConfiguration;
import com.gitee.dbswitch.data.service.APIMigrationService;
import com.gitee.dbswitch.data.service.MigrationService;
import com.gitee.dbswitch.data.util.JsonUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.sql.Timestamp;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.AsyncTaskExecutor;

import javax.swing.*;

@Slf4j
public class ExecuteJobTaskRunnable implements Runnable {

    private final static String MDC_KEY = LogbackAppenderRegister.LOG_MDC_KEY_NAME;

    // 相同taskId的任务限制并发执行的粒度锁缓存对象
    private static Cache<String, ReentrantLock> mutexes = CacheBuilder.newBuilder()
            .expireAfterWrite(24 * 60L, TimeUnit.MINUTES)
            .build();

    private volatile boolean interrupted = false;

    private MigrationService migrationService;

    private APIMigrationService apiMigrationService;

    private AssignmentTaskDAO assignmentTaskDAO;

    private AssignmentConfigDAO assignmentConfigDAO;

    private APIAssignmentConfigDAO apiAssignmentConfigDAO;

    private AssignmentJobDAO assignmentJobDAO;

    private AsyncTaskExecutor readerTaskExecutor;

    private AsyncTaskExecutor writerTaskExecutor;

    private AsyncTaskExecutor apiTaskExecutor;

    @Getter
    private Long taskId;

    private Integer schedule;

    private String keyName;

    private Integer configType;

    public ExecuteJobTaskRunnable(Long taskId, Integer schedule, String keyName, Integer configType) {
        this.assignmentTaskDAO = SpringUtil.getBean(AssignmentTaskDAO.class);
        this.assignmentConfigDAO = SpringUtil.getBean(AssignmentConfigDAO.class);
        this.apiAssignmentConfigDAO = SpringUtil.getBean(APIAssignmentConfigDAO.class);
        this.assignmentJobDAO = SpringUtil.getBean(AssignmentJobDAO.class);
        this.readerTaskExecutor = SpringUtil.getBean(
                ExecutorConfig.TASK_READ_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
        this.writerTaskExecutor = SpringUtil.getBean(
                ExecutorConfig.TASK_WRITE_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
        this.apiTaskExecutor = SpringUtil.getBean(
                ExecutorConfig.TASK_API_EXECUTOR_BEAN_NAME, AsyncTaskExecutor.class);
        this.taskId = taskId;
        this.schedule = schedule;
        this.keyName = keyName;
        this.configType=configType;
    }

    public void interrupt() {
        this.interrupted = true;
        if (null != this.migrationService) {
            this.migrationService.interrupt();
        }
    }

    @Override
    public void run() {
        AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
                .newAssignmentJob(taskId, schedule, keyName);
        MdcKeyValue mdcKeyValue = new MdcKeyValue(MDC_KEY, assignmentJobEntity.getId().toString());

        try {
            ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
            while (!lock.tryLock(1, TimeUnit.SECONDS)) {
                if (interrupted) {
                    log.info("Quartz task id:{} interrupted when get lock", taskId);
                    return;
                }
                TimeUnit.SECONDS.sleep(1);
            }

            try {
                log.info("Execute Job, and task id is : {} , job id is: {}",
                        taskId, assignmentJobEntity.getId());

                AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);

                log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties：{}",
                        task.getId(),
                        task.getName(),
                        task.getContent());

                try {
                    DbswtichPropertiesConfiguration properties = JsonUtils.toBeanObject(
                            task.getContent(), DbswtichPropertiesConfiguration.class);

                    if (ConfigTypeEnum.ASSIGNMENT_CONFIG.getValue().equals(configType)) {
                        handleAssignmentConfig(task.getId(), assignmentConfigDAO,properties);
                        migrationService = new MigrationService(properties, readerTaskExecutor, writerTaskExecutor);
                        // 实际执行JOB
                        migrationService.setMdcKeyValue(mdcKeyValue);
                        migrationService.run();
                    } else {
                        // 这里的api 传参还是要改
                        handleAPIAssignmentConfig(task.getId(), apiAssignmentConfigDAO,properties);
                        apiMigrationService = new APIMigrationService(properties,readerTaskExecutor,writerTaskExecutor,apiTaskExecutor);
                        // 实际执行JOB
                        apiMigrationService.setMdcKeyValue(mdcKeyValue);
                        apiMigrationService.run();
                    }

                    if (interrupted) {
                        log.info("Quartz task id:{} interrupted when prepare stage", taskId);
                        return;
                    }

                    assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
                    log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
                            task.getId(), assignmentJobEntity.getId(), task.getName());
                } catch (Throwable e) {
                    e.printStackTrace();
                    assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
                    assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
                    log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}, Message: {}",
                            task.getId(), assignmentJobEntity.getId(), task.getName(), e.getMessage());
                } finally {
                    AssignmentJobEntity latestJobEntity = assignmentJobDAO.getById(assignmentJobEntity.getId());
                    if (Objects.nonNull(latestJobEntity)) {
                        // 注意，这里有可能用户手动取消任务后，直接删除了任务和这个作业，导致查询不到了
                        latestJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
                        latestJobEntity.setErrorLog(assignmentJobEntity.getErrorLog());
                        if (JobStatusEnum.CANCEL.getValue() != latestJobEntity.getStatus()) {
                            latestJobEntity.setStatus(assignmentJobEntity.getStatus());
                        }
                        assignmentJobDAO.updateSelective(latestJobEntity);
                    }
                }

            } catch (Exception e){
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private void handleAssignmentConfig(Long taskId, AssignmentConfigDAO assignmentConfigDAO, DbswtichPropertiesConfiguration properties) {
        AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO.getByAssignmentTaskId(taskId);
        applyConfigProperties(properties, assignmentConfigEntity.getTargetDropTable(),
                assignmentConfigEntity.getTargetOnlyCreate(), assignmentConfigEntity.getFirstFlag());
        if (assignmentConfigEntity.getFirstFlag()) {
            AssignmentConfigEntity config = new AssignmentConfigEntity();
            config.setId(assignmentConfigEntity.getId());
            config.setFirstFlag(Boolean.FALSE);
            assignmentConfigDAO.updateSelective(config);
        }
    }

    private void handleAPIAssignmentConfig(Long taskId, APIAssignmentConfigDAO apiassignmentConfigDAO, DbswtichPropertiesConfiguration properties) {
        APIAssignmentConfigEntity apiassignmentConfigEntity = apiassignmentConfigDAO.getByAssignmentTaskId(taskId);
        applyConfigProperties(properties, apiassignmentConfigEntity.getTargetDropTable(),
                apiassignmentConfigEntity.getTargetOnlyCreate(), apiassignmentConfigEntity.getFirstFlag());
        if (apiassignmentConfigEntity.getFirstFlag()) {
            APIAssignmentConfigEntity config = new APIAssignmentConfigEntity();
            config.setId(apiassignmentConfigEntity.getId());
            config.setFirstFlag(Boolean.FALSE);
            apiassignmentConfigDAO.updateSelective(config);
        }
    }

    /**
     * 下面通过一个三元组来控制同步的方式
     * <targetDrop,onlyCreate,changeDataSync>
     *   <ul>
     *     <li>目标端建表并同步数据：false,false,true</li>
     *     <li>目标端只创建物理表：true,true,false</li>
     *     <li>目标端只同步表里数据：false,false,true</li>
     *   </ul>
     */
    private void applyConfigProperties(DbswtichPropertiesConfiguration targetProperties, boolean targetDropTable,
                                       boolean targetOnlyCreate, boolean firstFlag) {
        if (!targetDropTable && !targetOnlyCreate) {
            setTargetProperties(targetProperties, false, false, true);
        } else if (targetDropTable && targetOnlyCreate) {
            setTargetProperties(targetProperties, true, true, false);
        } else if (firstFlag) {
            // 首次同步，需要自动建表，然后全量加载数据同步
            setTargetProperties(targetProperties, true, false, false);
        } else {
            // 非首次，可能无需建表了，后续执行变化数据同步
            setTargetProperties(targetProperties, false, false, true);
        }
    }

    private void setTargetProperties(DbswtichPropertiesConfiguration targetProperties, boolean targetDrop,
                                     boolean onlyCreate, boolean changeDataSync) {
        targetProperties.getTarget().setTargetDrop(targetDrop);
        targetProperties.getTarget().setOnlyCreate(onlyCreate);
        targetProperties.getTarget().setChangeDataSync(changeDataSync);
    }
}
